package org.zjvis.datascience.service;

import java.io.*;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.apache.commons.io.IOUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;
import org.zjvis.datascience.common.dto.datax.DataXDTO;
import org.zjvis.datascience.common.util.RedisUtil;

/**
 * @description DataX Service
 * @date 2021-07-21
 */
@Service
public class DataXService {

    @Value("${dataX.filePath}")
    private String filePath;
    @Value("${commandPath}")
    private String commandPath;
    @Value("${dataX.pythonHome}")
    private String pythonHome;
    @Value("${dataX.pythonPath}")
    private String pythonPath;

    private final static Logger log = LoggerFactory.getLogger("DataXService");

    public String uploadDataX(DataXDTO dataXDTO) {
        File targetFile = new File(filePath);
        if (!targetFile.exists()) {
            targetFile.mkdirs();
        }
        FileWriter writer;
        try {
            writer = new FileWriter(filePath + "/" + dataXDTO.getFileName() + ".json");
            writer.write(dataXDTO.getDataXJson());
            writer.flush();
            writer.close();
        } catch (IOException e) {
            log.error("Exception! " + e.getMessage());
        }
        return "success";
    }

    public String executeDataX(String fileName) {
        String completeFilePath = filePath + "/" + fileName + ".json";
        File file = new File(completeFilePath);
        if (!file.exists()) {
            return "file not exist!";
        }
        SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
        log.info("execute dataX ! fileName = " + fileName + " time = " + df.format(new Date()));
        String cmd = "sh " + commandPath + " " + fileName;
        log.info("command: " + cmd);
//        CompletableFuture.runAsync(() -> exeCmd(cmd, fileName));
        int code = 0;
        Process p;
        try {
            String[] enp = new String[]{pythonHome, pythonPath};
            p = Runtime.getRuntime().exec(cmd, enp);
            code = p.waitFor();
            if (code != 0) {
                String result = IOUtils.toString(p.getErrorStream(), "utf-8");
                log.error("Command execute Error! reason = " + result);
            }
        } catch (Exception e) {
            log.error("Exception! " + e.getMessage());
        }
        return code == 0 ? "success" : "dataX execute error! code = " + code;
    }

//    public void exeCmd(String commandStr, String fileName) {
//        Process p = null;
//        BufferedReader br = null;
//        FileInputStream inputStream = null;
//        try {
////            String[] envp = new String[] {"PYTHONPATH=/usr/local/python","PYTHONHOME=/usr/local"};
//            p = Runtime.getRuntime().exec(commandStr);
//            String line;
//            int code = p.waitFor();
//            log.info("command code : " + code);
//            String errMsg = IOUtils.toString(p.getErrorStream(), "utf-8");
//            log.info("command code with errMsg : " + errMsg);
//            if (code != 0) {
//                haveExceptionMap.put(fileName, true);
//            } else {
//                inputStream = new FileInputStream(logPath + "/" + fileName + ".log");
//                br = new BufferedReader(new InputStreamReader(inputStream));
//                while ((line = br.readLine()) != null) {
//                    if (line.contains("读出记录总数")) {
//                        String pattern = "\\d+";
//                        Pattern r = Pattern.compile(pattern);
//                        Matcher m = r.matcher(line);
//                        if (m.find()) {
//                            redisUtil.lrPush("DataXRecord:" + fileName, m.group());
//                        }
//                    }
//                }
//            }
//        } catch (Exception e) {
//            log.error("Exception! " + e.getMessage());
//        } finally {
//            if (inputStream != null) {
//                try {
//                    inputStream.close();
//                } catch (IOException e) {
//                    log.error("Exception! " + e.getMessage());
//                }
//            }
//            if (br != null) {
//                try {
//                    p.destroy();
//                    br.close();
//                } catch (Exception e) {
//                    log.error("Exception! " + e.getMessage());
//                }
//            }
//        }
//    }
}
